Lambda(python3.6)を使って大量のデータをDynamoDBに追加するときはbatch_writerが便利
どうも!大阪オフィスの西村祐二です。
Lambdaを使ってファイルに記載された大量のデータをDynamoDBに追加する機会がありました。
実行時間の制限もあるため、効率よく追加できないかなと思っていろいろ探していたら
batch_writer()
を使えば
プロセスのスピードアップとサービスに対する書き込み要求数の削減が可能とのこと
だったので試してみました。
http://boto3.readthedocs.io/en/latest/guide/dynamodb.html#batch-writing
細かなところではありますが備忘録も兼ねて、ブログとして残しておこうと思います。
便利だと思ったところ
一度にputできる数の制限は気にしなくていい
一度にputできる数は最大25と制限があります。
http://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/Limits.html
以前、DynamoDBを勉強したときにこの制限のことは知っていたので、
今回「プログラムどうかこうかな~」と思っていましたが
batch_writer()
を使えば、特に気にせず裏でよしなにやってくれていました。
Request重複時に上書きしてくれる
BatchWriteItemの実行時、 同じアイテムへのRequestが重複すると
botocore.exceptions.ClientError: An error occurred (ValidationException)
のようなエラーがでるのですが、
こちらもbatch_writer(overwrite_by_pkeys=['partition_key', 'sort_key'])
のように、記載しておけば重複時に上書きしてくれます。
また、BatchWriteItemについて下記ブログがとても参考になります。
試してみる
設定
■Lambda ・ランタイム:python3.6 ・IAM:DynamoDBへのアクセス、BatchWriteItemの許可
■DynamoDB ・テーブル名:test-batch 今回わかりやすく、ブログのアクセス数管理を想定したテーブルにしてみました。 プライマリーキーを「user_name」、ソートキーを「blog」、アクセス数として「count」としています。
シナリオ:50アイテム追加してみる
制限25個ですが user1がblog1~blog50の50本書いたとして、そのアクセス数を追加してみます。
batch_writer()
を使った処理の部分は17行目から25行目です。
"""This is sample program.""" import logging import boto3 LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) DYNAMO = boto3.resource('dynamodb') def lambda_handler(event, context): """ DynamoDB put item """ try: table_name = "test-batch" table = DYNAMO.Table(table_name) with table.batch_writer() as batch: for i in range(50): batch.put_item( Item={ 'user_name': 'user1', 'blog': 'blog' + str(i), 'count': i, } ) LOGGER.info("Completed registration") return "end" except Exception as error: LOGGER.error(error) raise error
問題なく処理は完了しテーブルにデータが追加されています。
シナリオ:同じアイテムに重複requestしてみる
BatchWriteItem内でrequestが重複するように下記の処理を行ってみます。 ・put_item:user1, blog1, 111 ・put_item:user1, blog1, 222 ・delete_item:user1, blog2 ・put_item:user1, blog2, 333
■失敗させてみる 比較するために、わざと失敗させてみたいと思います。
"""This is sample program.""" import logging import boto3 LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) DYNAMO = boto3.resource('dynamodb') def lambda_handler(event, context): """ DynamoDB put item """ try: table_name = "test-batch" # put items (use Batch Writing) table = DYNAMO.Table(table_name) with table.batch_writer() as batch: batch.put_item( Item={ 'user_name': 'user1', 'blog': 'blog1', 'count': '111', } ) batch.put_item( Item={ 'user_name': 'user1', 'blog': 'blog1', 'count': '222', } ) batch.delete_item( Key={ 'user_name': 'user1', 'blog': 'blog2' } ) batch.put_item( Item={ 'user_name': 'user1', 'blog': 'blog2', 'count': '333', } ) LOGGER.info("Completed registration") return "end" except Exception as error: LOGGER.error(error) raise error
■成功させてみる
19行目を
batch_writer(overwrite_by_pkeys=['user_name', 'blog'])
のように修正して、再実行してみます。
"""This is sample program.""" import logging import boto3 LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) DYNAMO = boto3.resource('dynamodb') def lambda_handler(event, context): """ DynamoDB put item """ try: table_name = "test-batch" # put items (use Batch Writing) table = DYNAMO.Table(table_name) with table.batch_writer(overwrite_by_pkeys=['user_name', 'blog']) as batch: batch.put_item( Item={ 'user_name': 'user1', 'blog': 'blog1', 'count': '111', } ) batch.put_item( Item={ 'user_name': 'user1', 'blog': 'blog1', 'count': '222', } ) batch.delete_item( Key={ 'user_name': 'user1', 'blog': 'blog2' } ) batch.put_item( Item={ 'user_name': 'user1', 'blog': 'blog2', 'count': '333', } ) LOGGER.info("Completed registration") return "end" except Exception as error: LOGGER.error(error) raise error
想定通りに、BatchWriteItem内で重複したところは上書きされる形で追加されています。
さいごに
いかがだったでしょうか。
batch_writer()
を使って、大量のデータをテーブルに追加する処理や
重複request時の上書きする処理を試してみました。
誰かの参考になれば幸いです。